package com.paul.mq.dependencies.mqtt.emqx;//package com.ruoyi.mq.dependencies.mqtt;
//
//import lombok.extern.slf4j.Slf4j;
//import org.eclipse.paho.client.mqttv3.*;
//import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
//import org.springframework.boot.ApplicationArguments;
//import org.springframework.context.annotation.Bean;
//import org.springframework.stereotype.Component;
//
//import java.nio.charset.StandardCharsets;
//import java.util.Arrays;
//import java.util.Random;
//import java.util.concurrent.Executors;
//import java.util.concurrent.ScheduledExecutorService;
//import java.util.concurrent.TimeUnit;
//
///**
// * @author zmk
// * @date 2021/1/11 15:42
// * @description
// */
//@Slf4j
//@Component
//public class MqttConsumerClient {
//
//    //多个topic，使用, 隔开
//    static String subTopic = "testtopic1/1";
//    static String pubTopic = "testtopic/1";
//    String content = "Hello World";
//    static int initQos = 1;
//    String broker = "tcp://47.100.210.120:1883";
//    String clientId = "client";
//
//    String userName = "admin";
//    String passWord = "kj123456!@#";
//
//    int outTime = 10;
//    int keepTime = 20;
//
//    static MqttClient mqttClient;
//    MqttConnectOptions connOpts;
//
//
//    @Bean("mqttClient")
//    public void run() throws Exception {
//        log.info("初始化并启动mqtt......");
//        connect();
//    }
//
//
//    private void connect() {
//        try {
//            // 1 创建客户端
//            getClient();
//            // 2 设置配置
//            getOption();
//            // 3 消息发布质量
//            String[] topic = subTopic.split(",");
//            int[] qos = getQos(topic.length);
//            // 4 最后设置
//            create(connOpts, topic, qos);
//        } catch (Exception e) {
//            e.printStackTrace();
//        }
//    }
//
//    /**
//     * 创建客户端
//     */
//    public void getClient() {
//        try {
//            if (null == mqttClient) {
//                //host为主机名，clientid即连接MQTT的客户端ID，一般以唯一标识符表示，
//                // MemoryPersistence设置clientid的保存形式，默认为以内存保存
//                mqttClient = new MqttClient(broker, clientId, new MemoryPersistence());
//            }
//            log.info("--创建mqtt客户端");
//        } catch (Exception e) {
//            log.error("创建mqtt客户端异常：" + e);
//        }
//    }
//
//    /**
//     * 生成配置对象，用户名，密码等
//     */
//    private void getOption() {
//        //MQTT连接设置
//        connOpts = new MqttConnectOptions();
//        //设置是否清空session,false表示服务器会保留客户端的连接记录，true表示每次连接到服务器都以新的身份连接
//        connOpts.setCleanSession(true);
//        //设置连接的用户名
//        connOpts.setUserName(userName);
//        //设置连接的密码
//        connOpts.setPassword(passWord.toCharArray());
//        //设置超时时间 单位为秒
//        connOpts.setConnectionTimeout(outTime);
//        //设置会话心跳时间 单位为秒 服务器会每隔(1.5*keepTime)秒的时间向客户端发送个消息判断客户端是否在线，但这个方法并没有重连的机制
//        connOpts.setKeepAliveInterval(keepTime);
//        connOpts.setAutomaticReconnect(true);
//        //setWill方法，如果项目中需要知道客户端是否掉线可以调用该方法。设置最终端口的通知消息
////            option.setWill(topic, "close".getBytes(), 2, true);
//    }
//
//    /**
//     * qos
//     */
//    public int[] getQos(int length) {
//
//        int[] qos = new int[length];
//        for (int i = 0; i < length; i++) {
//            /**
//             *  MQTT协议中有三种消息发布服务质量:
//             *
//             * QOS0： “至多一次”，消息发布完全依赖底层 TCP/IP 网络。会发生消息丢失或重复。这一级别可用于如下情况，环境传感器数据，丢失一次读记录无所谓，因为不久后还会有第二次发送。
//             * QOS1： “至少一次”，确保消息到达，但消息重复可能会发生。
//             * QOS2： “只有一次”，确保消息到达一次。这一级别可用于如下情况，在计费系统中，消息重复或丢失会导致不正确的结果，资源开销大
//             */
//            qos[i] = initQos;
//        }
//        log.info("--设置消息发布质量");
//        return qos;
//    }
//
//    /**
//     * 装在各种实例和订阅主题
//     */
//    public void create(MqttConnectOptions options, String[] topic, int[] qos) {
//        try {
//            mqttClient.setCallback(new PushCallBack(mqttClient, options, topic, qos));
//            log.info("--添加回调处理类");
//            mqttClient.connect(options);
//        } catch (Exception e) {
//            log.info("装载实例或订阅主题异常：" + e);
//        }
//    }
//
//    /**
//     * 订阅某个主题
//     *
//     * @param topic
//     * @param qos
//     */
//    public void subscribe(String topic, int qos) {
//        try {
//            log.info("topic:" + topic);
//            mqttClient.subscribe(topic, qos);
//        } catch (MqttException e) {
//            e.printStackTrace();
//        }
//    }
//
//    /**
//     * 发布，非持久化
//     * qos根据文档设置为1
//     *
//     * @param topic
//     * @param msg
//     */
//    public static void publish(String topic, String msg) {
//        publish(1, false, topic, msg);
//    }
//
//    /**
//     * 发布
//     */
//    public static void publish(int qos, boolean retained, String topic, String pushMessage) {
//        MqttMessage message = new MqttMessage();
//        message.setQos(qos);
//        message.setRetained(retained);
//        message.setPayload(pushMessage.getBytes());
//        MqttTopic mTopic = mqttClient.getTopic(topic);
//        if (null == mTopic) {
//            log.error("topic：" + topic + " 不存在");
//        }
//        MqttDeliveryToken token;
//        try {
//            token = mTopic.publish(message);
//            token.waitForCompletion();
//
//            if (!token.isComplete()) {
//                log.info("消息发送成功");
//            }
//        } catch (MqttPersistenceException e) {
//            e.printStackTrace();
//        } catch (MqttException e) {
//            e.printStackTrace();
//        }
//    }
//
//
//    public class PushCallBack implements MqttCallbackExtended {
//        private MqttClient client;
//        private MqttConnectOptions options;
//        private String[] topic;
//        private int[] qos;
//
//        public PushCallBack(MqttClient client, MqttConnectOptions options, String[] topic, int[] qos) {
//            this.client = client;
//            this.options = options;
//            this.topic = topic;
//            this.qos = qos;
//        }
//
//        /**
//         * 断开重连
//         */
//        @Override
//        public void connectionLost(Throwable cause) {
//            log.info("MQTT连接断开，发起重连......");
//            try {
//                if (null != client && !client.isConnected()) {
//                    client.reconnect();
//                    log.error("尝试重新连接");
//                } else {
//                    client.connect(options);
//                    log.error("尝试建立新连接");
//                }
//            } catch (Exception e) {
//                e.printStackTrace();
//            }
//        }
//
//        /**
//         * 消息处理
//         */
//        @Override
//        public void messageArrived(String topic, MqttMessage message) throws Exception {
//            String msg = new String(message.getPayload(), StandardCharsets.UTF_8);
//            log.info("收到topic:" + topic + " 消息：" + msg);
//            try {
////                        consumerMessageManager.doOnMessage(result);
//                publish(pubTopic, clientId + content + new Random().nextInt());
//            } catch (Exception e) {
//                log.info("处理mqtt消息异常:" + e);
//            }
//        }
//
//        /**
//         * 接收到消息调用令牌中调用
//         */
//        @Override
//        public void deliveryComplete(IMqttDeliveryToken token) {
//            System.out.println("deliveryComplete---------" + token.isComplete());
//            log.info("deliveryComplete---------" + Arrays.toString(topic));
//        }
//
//        /**
//         * mqtt连接后订阅主题
//         */
//        @Override
//        public void connectComplete(boolean b, String s) {
//            try {
//                if (null != topic && null != qos) {
//                    if (client.isConnected()) {
//                        client.subscribe(topic, qos);
//                        log.info("mqtt连接成功，客户端ID：" + clientId);
//                        log.info("--订阅主题:：" + Arrays.toString(topic));
//                    } else {
//                        log.info("mqtt连接失败，客户端ID：" + clientId);
//                    }
//                }
//            } catch (Exception e) {
//                log.info("mqtt订阅主题异常:" + e);
//            }
//        }
//    }
//}
